xds/internal/client: Add async gauge metrics for Connected and Resources (A78)#8807
xds/internal/client: Add async gauge metrics for Connected and Resources (A78)#8807mbissa wants to merge 8 commits intogrpc:masterfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #8807 +/- ##
==========================================
- Coverage 83.26% 83.07% -0.19%
==========================================
Files 418 411 -7
Lines 33004 33025 +21
==========================================
- Hits 27480 27437 -43
- Misses 4109 4193 +84
+ Partials 1415 1395 -20
🚀 New features to boost your workflow:
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces asynchronous gauge metrics for tracking xDS client connectivity and resource counts, which is a valuable addition for observability. The implementation looks mostly solid, but I've found a critical issue in the connectivity metric logic that causes it to report incorrect status. I've also included a couple of medium-severity suggestions to improve test coverage and code clarity. Please see the detailed comments below.
| default: | ||
| // Fallback for initialization states | ||
| return "requested" | ||
| } |
There was a problem hiding this comment.
I don't see this being specified in the gRFC. Am I missing something?
There was a problem hiding this comment.
We are handling that as part of the case for xdsresource.ServiceStatusRequested, right?
916bd0e to
30b9c73
Compare
sequenceDiagram
participant OS as "OTel/Stats System"
participant Rec as "MetricsRecorder (Core)"
participant MR as "MetricsReporter (Adapter)"
participant XC as "XDSClient"
participant AS as "ADS Stream"
Note over XC, Rec: Phase 1: Registration (One-time)
XC->>MR: RegisterAsyncReporter(reporter)
MR->>Rec: RegisterAsyncReporter(hook)
Rec-->>XC: metricsCleanup func
Note over XC, AS: Phase 2: Synchronous Events & ADS Lifecycle
XC->>AS: WatchResource(lds:foo)
AS->>AS: NewStream() Success
AS->>XC: established = true
AS->>XC: Recv(Response: lds:foo)
XC->>XC: Cache Update (Requested -> ACKed)
AS->>AS: Recv() Error (Stream Breaks)
AS->>XC: established = false
alt if legitimate (gRFC A9)
AS->>MR: (No Counter)
else non-legitimate
AS->>MR: ReportMetric(ServerFailure)
end
MR->>Rec: Record Counter
Note over OS, XC: Phase 3: Asynchronous Scrape (Periodic)
OS->>Rec: Scrape Metrics
Rec->>MR: Trigger Hook
MR->>XC: Report(AsyncMetricsRecorder)
XC->>XC: reportConnectedState() --> 1 if established, else 0
XC->>XC: reportResourceStats() --> counts by (Type, CacheState)
XC-->>OS: Return Gauge Values (via Core)
Note over XC, Rec: Phase 4: Cleanup
XC->>Rec: metricsCleanup()
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces asynchronous gauge metrics for tracking the xDS client's connection status and the number of cached xDS resources, as specified in gRFC A78. The implementation is well-structured, introducing new interfaces for asynchronous metric reporting and ensuring proper lifecycle management. The accompanying tests are thorough and cover various states and scenarios for the new metrics. I have a couple of minor suggestions to improve variable naming for better code clarity and maintainability.
| default: | ||
| // Fallback for initialization states | ||
| return "requested" | ||
| } |
There was a problem hiding this comment.
We are handling that as part of the case for xdsresource.ServiceStatusRequested, right?
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
7582c77 to
720d321
Compare
720d321 to
f397493
Compare
| // XDSClientConnected reports the connectivity state of the ADS stream. | ||
| // Per gRFC A78, Value is 1 if connected, 0 otherwise. | ||
| // Labels: grpc.target, grpc.xds.server | ||
| // grpc.target is added by asyncMetricsRecorderAdapter |
There was a problem hiding this comment.
This docstring is eventually for user consumption (although currently it is under a top-level internal package). Why does it matter to the user if grpc.target is added by asyncMetricsRecorderAdapter?
Also, please take a look at go/go-style/decisions#comment-sentences.
The above comments apply to here and to XDSClientResourceStats.
There was a problem hiding this comment.
I added that because I misunderstood an earlier comment. fixed it now.
| firstRequest bool // False after the first request is sent out. | ||
| firstStreamCreated bool // Set to true after the very first ADS stream is created. | ||
| streamEstablished bool // Set to true when an ADS stream is established and a response is received, except for the very first stream which is set to true immediately. |
There was a problem hiding this comment.
What do you think about making these three fields be of type atomic.Bool and not have to acquire the lock to read/write them. Currently, there seems to be too much of locking and unlocking.
There was a problem hiding this comment.
Great suggestion! done.
internal/xds/xdsclient/clientimpl.go
Outdated
| @@ -75,6 +75,20 @@ var ( | |||
| Labels: []string{"grpc.target", "grpc.xds.server"}, | |||
| Default: false, | |||
| }) | |||
| xdsClientConnectedMetric = estats.RegisterInt64AsyncGauge(estats.MetricDescriptor{ | |||
| Name: "grpc.xds_client.connected", | |||
| Description: "A metric that is 1 if the xDS Client is connected to an xDS server, 0 otherwise.", | |||
There was a problem hiding this comment.
Nit: Similar comment about the description that it is not necessarily whether the xDS client is connected to the server or not, but more about whether it has a working ADS stream to the server.
| // documentation for a list of possible metrics events. | ||
| ReportMetric(metric any) | ||
|
|
||
| // RegisterAsyncReporter registers a reporter to produce metric values for |
There was a problem hiding this comment.
Nit in google3. internal packages are not the same in google3.
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
| r.asyncReporters[reporter] = struct{}{} | ||
| return func() { |
There was a problem hiding this comment.
Please return a onceFunc, so that it is idempotent.
| return nil | ||
| } | ||
| // Continue if mismatch. | ||
|
|
There was a problem hiding this comment.
removed the comment and newline as docstring is added now.
| return nil | ||
| } | ||
|
|
||
| func (r *testMetricsReporter) waitForSpecificMetric(ctx context.Context, metricsDataWant any) error { |
There was a problem hiding this comment.
We already have a waitForMetric that simply reads the first metric data out of the channel and compares it with want. It is well documented. There is no documentation for this function. Why do we need this? Why does the above one not suffice? Under what circumstances would one use this? All this needs to be clarified in the docstring.
There was a problem hiding this comment.
added doc string. We need this when the first metric data out of the channel is not the one we want and sequence is not guaranteed.
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
| defer cancel() | ||
|
|
||
| tmr := newTestMetricsReporter() |
There was a problem hiding this comment.
Nit: Please create this right before it is used.
Is there any reason for this to be created upfront here?
| resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType} | ||
| si := clients.ServerIdentifier{ | ||
| ServerURI: mgmtServer.Address, | ||
| Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}, | ||
| } | ||
| configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} |
There was a problem hiding this comment.
Since none of these vars are used anywhere outside of the literal struct initialization for xdsClientConfig, I would recommend that these be inlined. The literal struct initialization syntax in Go is very readable when correctly used. See: go/go-style/decisions#literal-formatting
| t.Fatalf("Failed to verify nacked count: %v", err) | ||
| } | ||
|
|
||
| // Wait for the does_not_exist state. |
There was a problem hiding this comment.
How does this work? The xDS client uses a default watch timeout of 15s? How is the resource-not-found triggered in this test?
| OnStreamOpen: func(_ context.Context, _ int64, _ string) error { | ||
| return nil | ||
| }, | ||
| OnStreamRequest: func(_ int64, _ *v3discoverypb.DiscoveryRequest) error { | ||
| return nil | ||
| }, |
There was a problem hiding this comment.
If these are not used, there is no need for them to be defined as empty funcs.
| unblock := make(chan struct{}) | ||
|
|
||
| customGRPCNewClient := func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { | ||
| interceptor := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, cops ...grpc.CallOption) (grpc.ClientStream, error) { |
There was a problem hiding this comment.
There is a lot happening here. Please add comments to make it easier for the readers.
| // 5. NewStream OK - metric value 0 | ||
| lis.Restart() | ||
|
|
||
| time.Sleep(5 * time.Second) |
There was a problem hiding this comment.
We can't sleep in our tests like this. It unnecessarily increases the runtime of tests. Please figure out a way to wait for the event that you need to wait for, and then proceed.
| defer sCancel() | ||
| got, err := tmr.metricsCh.Receive(sCtx) | ||
| if err != nil { | ||
| t.Fatalf("Step 5 failed: Timeout waiting for XDSClientConnected metric: %v", err) |
There was a problem hiding this comment.
Someone reading logs from a test failure should be able to reasonably understand why the test is failing. Things like "Step 5" does not help in that respect.

This PR leverages the async gauge framework and implements the xdsclient metrics to report number of xds resources and whether or not the xDS client currently has a working ADS stream to the xDS server along with the required labels as part of A78
RELEASE NOTES: